{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Content Planning and Publishing Crew"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook demonstrates how to create an AI crew for planning and publishing content using CrewAI Flows.\n",
    "The crew will take a link to blog post, download content as markdown using firecrawl, analyze it and generate a twitter thread and schedule it on Typefully."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Initialization and Setup\n",
    "Initial imports for the CrewAI Flow and Crew and setting up the environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Importing necessary libraries\n",
    "import getpass\n",
    "import os\n",
    "import datetime\n",
    "import uuid\n",
    "import yaml\n",
    "import json\n",
    "import subprocess\n",
    "from pathlib import Path\n",
    "import pydantic\n",
    "from pydantic import BaseModel\n",
    "from typing import Optional\n",
    "\n",
    "# Firecrawl SDK\n",
    "from firecrawl import FirecrawlApp\n",
    "\n",
    "# Typefully scheduler\n",
    "import scheduler\n",
    "\n",
    "# Importing Crew related components\n",
    "from crewai import Agent, Task, Crew, LLM\n",
    "\n",
    "# Importing CrewAI Flow related components\n",
    "from crewai.flow.flow import Flow, listen, start, router, or_\n",
    "\n",
    "from dotenv import load_dotenv\n",
    "load_dotenv()\n",
    "# Apply a patch to allow nested asyncio loops in Jupyter\n",
    "import nest_asyncio\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup LLM\n",
    "\n",
    "Make sure you have ollama installed and running on your machine"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "llm = LLM(\n",
    "    model=\"ollama/llama3.2\",\n",
    "    base_url=\"http://localhost:11434\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Blog Post URL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "blog_post_url = \"https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Plan for our Flow\n",
    "\n",
    "1. Scrape the blog post\n",
    "2. Decode where to post using a router\n",
    "3. Kickoff the right **[Crew of Agents]** to prepare a draft ready to publish\n",
    "4. Publish it using typefully"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<img src=\"content_writing_flow.png\" width=\"1000\" height=\"750\"/>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from IPython.display import HTML\n",
    "HTML('<img src=\"content_writing_flow.png\" width=\"1000\" height=\"750\"/>')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Twitter Thread Planning Crew\n",
    "\n",
    "This structure will be used to capture the output of the planning crew which will be used to create the twitter thread and schedule it on Typefully."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Tweet(BaseModel):\n",
    "    \"\"\"Represents an individual tweet in a thread\"\"\"\n",
    "    content: str\n",
    "    is_hook: bool = False  # Identifies if this is the opening/hook tweet\n",
    "    media_urls: Optional[list[str]] = []  # Optional media attachments (images, code snippets)\n",
    "\n",
    "class Thread(BaseModel):\n",
    "    \"\"\"Represents a Twitter thread\"\"\"\n",
    "    topic: str  # Main topic/subject of the thread\n",
    "    tweets: list[Tweet]  # List of tweets in the thread"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "from crewai_tools import (\n",
    "    DirectoryReadTool,\n",
    "    FileReadTool,\n",
    ")\n",
    "\n",
    "# Load agent and task configurations from YAML files\n",
    "with open('config/planner_agents.yaml', 'r') as f:\n",
    "    agents_config = yaml.safe_load(f)\n",
    "\n",
    "with open('config/planner_tasks.yaml', 'r') as f:\n",
    "    tasks_config = yaml.safe_load(f)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2024-12-19 14:18:07,847 - 7988681536 - __init__.py-__init__:537 - WARNING: Overriding of current TracerProvider is not allowed\n"
     ]
    }
   ],
   "source": [
    "draft_analyzer = Agent(config=agents_config['draft_analyzer'], tools=[\n",
    "    DirectoryReadTool(),\n",
    "    FileReadTool()\n",
    "], llm=llm)\n",
    "twitter_thread_planner = Agent(config=agents_config['twitter_thread_planner'], tools=[\n",
    "    DirectoryReadTool(),\n",
    "    FileReadTool()\n",
    "], llm=llm)\n",
    "\n",
    "analyze_draft = Task(\n",
    "  config=tasks_config['analyze_draft'],\n",
    "  agent=draft_analyzer\n",
    ")\n",
    "create_twitter_thread_plan = Task(\n",
    "  config=tasks_config['create_twitter_thread_plan'],\n",
    "  agent=twitter_thread_planner,\n",
    "  output_pydantic=Thread\n",
    ")\n",
    "\n",
    "planning_crew = Crew(\n",
    "    agents=[draft_analyzer, twitter_thread_planner],\n",
    "    tasks=[analyze_draft, create_twitter_thread_plan],\n",
    "    verbose=False\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# LinkedIn Post Planning Crew"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "class LinkedInPost(BaseModel):\n",
    "    \"\"\"Represents a LinkedIn post\"\"\"\n",
    "    content: str\n",
    "    media_url: str # Main image url for the post"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2024-12-19 14:25:39,453 - 7988681536 - __init__.py-__init__:537 - WARNING: Overriding of current TracerProvider is not allowed\n"
     ]
    }
   ],
   "source": [
    "linkedin_post_planner = Agent(config=agents_config['linkedin_post_planner'], llm=llm)\n",
    "\n",
    "create_linkedin_post_plan = Task(\n",
    "  config=tasks_config['create_linkedin_post_plan'],\n",
    "  agent=linkedin_post_planner,\n",
    "  output_pydantic=LinkedInPost\n",
    ")\n",
    "\n",
    "linkedin_planning_crew = Crew(\n",
    "    agents=[draft_analyzer, linkedin_post_planner],\n",
    "    tasks=[analyze_draft, create_linkedin_post_plan],\n",
    "    verbose=False\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Content Planning Flow\n",
    "\n",
    "A Flow to create the content planning for twitter and linkedin using separate crews for twitter and linkedin"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<img src=\"content_writing_flow.png\" width=\"1000\" height=\"750\"/>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "execution_count": 37,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from IPython.display import HTML\n",
    "HTML('<img src=\"content_writing_flow.png\" width=\"1000\" height=\"750\"/>')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [],
   "source": [
    "from crewai.flow.flow import Flow, listen, start, router, or_\n",
    "\n",
    "class ContentPlanningState(BaseModel):\n",
    "  \"\"\"\n",
    "  State for the content planning flow\n",
    "  \"\"\"\n",
    "  blog_post_url: str = blog_post_url\n",
    "  draft_path: Path = \"workdir/\"\n",
    "  post_type: str = \"twitter\"\n",
    "  path_to_example_threads: str = \"workdir/example_threads.txt\"\n",
    "\n",
    "class CreateContentPlanningFlow(Flow[ContentPlanningState]):\n",
    "  # Scrape the blog post  \n",
    "  # No need for AI Agents on this step, so we just use regular Python code\n",
    "  @start()\n",
    "  def scrape_blog_post(self):\n",
    "    print(f\"# fetching draft from: {self.state.blog_post_url}\")\n",
    "    app = FirecrawlApp(api_key=os.getenv(\"FIRECRAWL_API_KEY\"))\n",
    "    scrape_result = app.scrape_url(self.state.blog_post_url, params={'formats': ['markdown', 'html']})\n",
    "    try:\n",
    "      title = scrape_result['metadata']['title']\n",
    "    except Exception as e:\n",
    "      title = str(uuid.uuid4())\n",
    "    self.state.draft_path = f'workdir/{title}.md'\n",
    "    with open(self.state.draft_path, 'w') as f:\n",
    "      f.write(scrape_result['markdown'])\n",
    "    return self.state\n",
    "\n",
    "  @router(scrape_blog_post)\n",
    "  def select_platform(self):\n",
    "    if self.state.post_type == \"twitter\":\n",
    "      return \"twitter\"\n",
    "    elif self.state.post_type == \"linkedin\":\n",
    "      return \"linkedin\"\n",
    "\n",
    "  @listen(\"twitter\")\n",
    "  def twitter_draft(self):\n",
    "    print(f\"# Planning content for: {self.state.draft_path}\")\n",
    "    result = planning_crew.kickoff(inputs={'draft_path': self.state.draft_path, 'path_to_example_threads': self.state.path_to_example_threads})\n",
    "    print(f\"# Planned content for {self.state.draft_path}:\")\n",
    "    for tweet in result.pydantic.tweets:\n",
    "        print(f\"    - {tweet.content}\")\n",
    "    return result\n",
    "  \n",
    "  @listen(\"linkedin\")\n",
    "  def linkedin_draft(self):\n",
    "    print(f\"# Planning content for: {self.state.draft_path}\")\n",
    "    result = linkedin_planning_crew.kickoff(inputs={'draft_path': self.state.draft_path})\n",
    "    print(f\"# Planned content for {self.state.draft_path}:\")\n",
    "    print(f\"    - {result.pydantic.content}\")\n",
    "    return result\n",
    "\n",
    "  @listen(or_(twitter_draft, linkedin_draft))\n",
    "  def save_plan(self, plan):\n",
    "    with open(f'thread/{self.state.draft_path.split(\"/\")[-1]}_{self.state.post_type}.json', 'w') as f:\n",
    "        json.dump(plan.pydantic.model_dump(), f, indent=2)\n",
    "\n",
    "  @listen(or_(twitter_draft, linkedin_draft))\n",
    "  def publish(self, plan):\n",
    "    print(f\"# Publishing thread for: {self.state.draft_path}\")\n",
    "    ## Schedule for 1 hour from now    \n",
    "    response = scheduler.schedule(\n",
    "        thread_model=plan,\n",
    "        post_type=self.state.post_type\n",
    "    )\n",
    "    print(f\"# Thread scheduled for: {self.state.draft_path}\")\n",
    "    print(f\"Here's the link to scheduled draft: {response['share_url']}\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Implementing helper methods to plot and execute the flow in a Jupyter notebook"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Plot saved as crewai_flow.html\n"
     ]
    }
   ],
   "source": [
    "# Plot the flow\n",
    "flow = CreateContentPlanningFlow()\n",
    "flow.plot()\n",
    "\n",
    "# Display the flow visualization using IFrame\n",
    "from IPython.display import IFrame\n",
    "\n",
    "# Display the flow visualization\n",
    "# IFrame(src='./crewai_flow.html', width='100%', height=400)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ContentPlanningState(blog_post_url='https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag', draft_path='workdir/', post_type='twitter', path_to_example_threads='workdir/example_threads.txt')"
      ]
     },
     "execution_count": 44,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "post_type = \"twitter\"\n",
    "flow = CreateContentPlanningFlow()\n",
    "flow.state.post_type = post_type\n",
    "flow.state"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "# fetching draft from: https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag\n",
      "# Planning content for: workdir/5 Chunking Strategies For RAG - by Avi Chawla.md\n",
      "# Planned content for workdir/5 Chunking Strategies For RAG - by Avi Chawla.md:\n",
      "    - 5 Chunking Strategies For RAG\n",
      "    - What is Retrieval-Augmented Generation (RAG)? 🤔 It's a technique that boosts language models with information retrieval, refining the interaction with large data. Chunking plays a crucial role here!\n",
      "    - 1️⃣ Fixed-size Chunking: This method splits text into uniform segments. While straightforward, it risks losing contextual meaning. Use with care! 🔍\n",
      "    - 2️⃣ Semantic Chunking: Here, we create chunks based on meaningful units (like sentences). This ensures better coherence and relevance in information retrieval. 🤝\n",
      "    - 3️⃣ Recursive Chunking: Breaks text using natural language boundaries, like paragraphs, and refines chunks that are too large. Preserve flow while keeping it efficient! 📏\n",
      "    - 4️⃣ Document Structure-based Chunking: Leverage document headings and sections for chunk boundaries. This maintains logical structure and aids retrieval accuracy! 📚\n",
      "    - 5️⃣ LLM-based Chunking: Use large language models to generate semantically-rich chunks. This is powerful but comes with high computational costs. ⚡️\n",
      "    - Key Takeaway: Each chunking strategy has its own strengths and weaknesses. Select the right method based on your model and content to optimize performance! 🚀\n",
      "    - If you're diving into RAG, embrace chunking. Explore these strategies and elevate your AI's ability to communicate knowledge effectively! Join the discussion! 💬\n",
      "# Publishing thread for: workdir/5 Chunking Strategies For RAG - by Avi Chawla.md\n",
      "######## Thread JSON:  {'topic': '5 Chunking Strategies For RAG', 'tweets': [{'content': '5 Chunking Strategies For RAG', 'is_hook': True, 'media_urls': []}, {'content': \"What is Retrieval-Augmented Generation (RAG)? 🤔 It's a technique that boosts language models with information retrieval, refining the interaction with large data. Chunking plays a crucial role here!\", 'is_hook': False, 'media_urls': ['https://substackcdn.com/image/fetch/w_1456,c_limit,f_auto,q_auto:good,fl_lossy/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F6878b8fa-5e74-45a1-9a89-5aab92889126_2366x990.gif']}, {'content': '1️⃣ Fixed-size Chunking: This method splits text into uniform segments. While straightforward, it risks losing contextual meaning. Use with care! 🔍', 'is_hook': False, 'media_urls': ['https://substackcdn.com/image/fetch/w_1456,c_limit,f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F98c422a0-f0e2-457c-a256-4476a56a601f_943x232.png']}, {'content': '2️⃣ Semantic Chunking: Here, we create chunks based on meaningful units (like sentences). This ensures better coherence and relevance in information retrieval. 🤝', 'is_hook': False, 'media_urls': ['https://substackcdn.com/image/fetch/w_1456,c_limit,f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Fa6ad83a6-2879-4c77-9e49-393f16577aef_1066x288.gif']}, {'content': '3️⃣ Recursive Chunking: Breaks text using natural language boundaries, like paragraphs, and refines chunks that are too large. Preserve flow while keeping it efficient! 📏', 'is_hook': False, 'media_urls': ['https://substackcdn.com/image/fetch/w_1456,c_limit,f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Ff4009caa-34fc-48d6-8102-3d0f6f2c1386_1066x316.gif']}, {'content': '4️⃣ Document Structure-based Chunking: Leverage document headings and sections for chunk boundaries. This maintains logical structure and aids retrieval accuracy! 📚', 'is_hook': False, 'media_urls': ['https://substackcdn.com/image/fetch/w_1456,c_limit,f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Fe8febecd-ee68-42ff-ab06-41a0a3a43cd3_1102x306.gif']}, {'content': '5️⃣ LLM-based Chunking: Use large language models to generate semantically-rich chunks. This is powerful but comes with high computational costs. ⚡️', 'is_hook': False, 'media_urls': ['https://substackcdn.com/image/fetch/w_1456,c_limit,f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F4d1b6d60-8956-4030-8525-d899ee61a9d5_1140x198.gif']}, {'content': 'Key Takeaway: Each chunking strategy has its own strengths and weaknesses. Select the right method based on your model and content to optimize performance! 🚀', 'is_hook': False, 'media_urls': []}, {'content': \"If you're diving into RAG, embrace chunking. Explore these strategies and elevate your AI's ability to communicate knowledge effectively! Join the discussion! 💬\", 'is_hook': False, 'media_urls': []}]}\n",
      "Thread scheduled successfully!\n",
      "# Thread scheduled for: workdir/5 Chunking Strategies For RAG - by Avi Chawla.md\n",
      "Here's the link to scheduled draft: https://typefully.com/t/hXG9c1d\n"
     ]
    }
   ],
   "source": [
    "\n",
    "flow.kickoff()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [],
   "source": [
    "flow = CreateContentPlanningFlow()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ContentPlanningState(blog_post_url='https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag', draft_path='workdir/', post_type='linkedin')"
      ]
     },
     "execution_count": 42,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "flow.state"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
